Qu'est-ce que l'asynchrone ? La programmation asynchrone permet d'exécuter plusieurs tâches en attente (I/O, requêtes réseau, etc.) simultanément sans bloquer le programme, contrairement au code synchrone classique.
1. Synchrone vs Asynchrone
| Synchrone | Asynchrone |
| Les tâches s'exécutent une par une | Plusieurs tâches peuvent être en attente |
| Une tâche lente bloque tout le programme | Pendant l'attente, d'autres tâches s'exécutent |
| Plus simple à comprendre et déboguer | Plus efficace pour les opérations I/O |
| Moins efficace pour beaucoup d'opérations lentes | Idéal pour API, bases de données, fichiers |
import time
def tache_lente(nom):
print(f"Début de {nom}")
time.sleep(2)
print(f"Fin de {nom}")
return f"Résultat de {nom}"
debut = time.time()
resultat1 = tache_lente("Tâche 1")
resultat2 = tache_lente("Tâche 2")
print(f"Temps total: {time.time() - debut} secondes")
import asyncio
async def tache_lente_async(nom):
print(f"Début de {nom}")
await asyncio.sleep(2)
print(f"Fin de {nom}")
return f"Résultat de {nom}"
async def main():
debut = time.time()
resultats = await asyncio.gather(
tache_lente_async("Tâche 1"),
tache_lente_async("Tâche 2")
)
print(f"Temps total: {time.time() - debut} secondes")
print(resultats)
asyncio.run(main())
2. Mots-clés async/await
async def ma_coroutine():
return "Résultat"
coro = ma_coroutine()
async def utilisateur():
resultat = await ma_coroutine()
print(resultat)
asyncio.run(utilisateur())
Règles importantes :
- Une fonction async doit être appelée avec
await ou asyncio.run()
await ne peut être utilisé que dans une fonction async
- Les fonctions async retournent un objet
coroutine, pas la valeur directement
- Ne pas oublier
await avant les opérations asynchrones (sinon la coroutine ne s'exécute pas)
3. Exécution concurrente avec asyncio
import asyncio
async def tache(numero, duree):
print(f"Tâche {numero} démarre (durée {duree}s)")
await asyncio.sleep(duree)
print(f"Tâche {numero} terminée")
return f"Résultat {numero}"
async def exemples_concurrents():
resultats = await asyncio.gather(
tache(1, 2),
tache(2, 1),
tache(3, 3)
)
print("Résultats gather:", resultats)
tache1 = asyncio.create_task(tache(4, 2))
tache2 = asyncio.create_task(tache(5, 1))
await tache1
await tache2
taches = [tache(6, 2), tache(7, 1)]
termine, en_attente = await asyncio.wait(taches, return_when=asyncio.FIRST_COMPLETED)
print("Première tâche terminée")
4. Timeouts et délais
import asyncio
async def operation_lente():
print("Début opération lente...")
await asyncio.sleep(5)
return "Opération réussie"
async def avec_timeout():
try:
resultat = await asyncio.wait_for(operation_lente(), timeout=2)
print(resultat)
except asyncio.TimeoutError:
print("L'opération a pris trop de temps!")
async def avec_timeout_partiel():
try:
async with asyncio.timeout(2):
await asyncio.sleep(1)
print("Cette partie s'exécute")
await asyncio.sleep(2)
except TimeoutError:
print("Timeout déclenché")
5. Synchronisation entre tâches
import asyncio
verrou = asyncio.Lock()
compteur_partage = 0
async def incrementer_avec_verrou():
global compteur_partage
async with verrou:
valeur = compteur_partage
await asyncio.sleep(0.1)
compteur_partage = valeur + 1
evenement = asyncio.Event()
async def attendre_signal():
print("En attente du signal...")
await evenement.wait()
print("Signal reçu!")
async def envoyer_signal():
await asyncio.sleep(2)
evenement.set()
print("Signal envoyé")
queue = asyncio.Queue()
async def producteur():
for i in range(5):
await queue.put(f"Donnée {i}")
await asyncio.sleep(0.5)
async def consommateur():
while True:
donnee = await queue.get()
print(f"Traitement: {donnee}")
6. Applications pratiques
import aiohttp
import asyncio
async def fetch_url(session, url):
async with session.get(url) as response:
return await response.text()
async def fetch_multiple():
urls = [
"https://api.github.com",
"https://httpbin.org/get",
"https://jsonplaceholder.typicode.com/posts/1"
]
async with aiohttp.ClientSession() as session:
taches = [fetch_url(session, url) for url in urls]
resultats = await asyncio.gather(*taches)
for url, contenu in zip(urls, resultats):
print(f"{url}: {len(contenu)} caractères")
import aiofiles
import asyncio
async def lire_fichier_async(chemin):
async with aiofiles.open(chemin, mode='r') as fichier:
contenu = await fichier.read()
return contenu
async def lire_plusieurs(fichiers):
taches = [lire_fichier_async(f) for f in fichiers]
resultats = await asyncio.gather(*taches)
return resultats
Bonnes pratiques :
- ✔️ Utilisez
asyncio.run() comme point d'entrée unique
- ✔️ Préférez
asyncio.gather() pour exécuter plusieurs coroutines
- ✔️ Évitez les opérations bloquantes (time.sleep, requests.get) dans le code async
- ✔️ Utilisez
asyncio.to_thread() pour les opérations CPU-intensives
- ✔️ Ne mélangez pas code synchrone et asynchrone dans la même fonction
- ✔️ Testez avec
pytest-asyncio
Attention : La programmation asynchrone n'accélère pas les opérations CPU-intensives. Pour cela, utilisez le multiprocessing ou concurrent.futures. L'asynchrone est idéal pour les I/O (réseau, disque, bases de données).